/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Coprocessor for flow run table.
 */
public class FlowRunCoprocessor extends BaseRegionObserver {

  private static final Logger LOG =
      LoggerFactory.getLogger(FlowRunCoprocessor.class);

  private Region region;
  /**
   * generate a timestamp that is unique per row in a region this is per region.
   */
  private final TimestampGenerator timestampGenerator =
      new TimestampGenerator();

  @Override
  public void start(CoprocessorEnvironment e) throws IOException {
    if (e instanceof RegionCoprocessorEnvironment) {
      RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
      this.region = env.getRegion();
    }
  }

  /*
   * (non-Javadoc)
   *
   * This method adds the tags onto the cells in the Put. It is presumed that
   * all the cells in one Put have the same set of Tags. The existing cell
   * timestamp is overwritten for non-metric cells and each such cell gets a new
   * unique timestamp generated by {@link TimestampGenerator}
   *
   * @see
   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
   * .hadoop.hbase.coprocessor.ObserverContext,
   * org.apache.hadoop.hbase.client.Put,
   * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
   * org.apache.hadoop.hbase.client.Durability)
   */
  @Override
  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
      WALEdit edit, Durability durability) throws IOException {
    Map<String, byte[]> attributes = put.getAttributesMap();
    // Assumption is that all the cells in a put are the same operation.
    List<Tag> tags = new ArrayList<>();
    if ((attributes != null) && (attributes.size() > 0)) {
      for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
        Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
        tags.add(t);
      }
      byte[] tagByteArray = Tag.fromList(tags);
      NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
          Bytes.BYTES_COMPARATOR);
      for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
          .entrySet()) {
        List<Cell> newCells = new ArrayList<>(entry.getValue().size());
        for (Cell cell : entry.getValue()) {
          // for each cell in the put add the tags
          // Assumption is that all the cells in
          // one put are the same operation
          // also, get a unique cell timestamp for non-metric cells
          // this way we don't inadvertently overwrite cell versions
          long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
          newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
              CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
              cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
              tagByteArray));
        }
        newFamilyMap.put(entry.getKey(), newCells);
      } // for each entry
      // Update the family map for the Put
      put.setFamilyCellMap(newFamilyMap);
    }
  }

  /**
   * Determines if the current cell's timestamp is to be used or a new unique
   * cell timestamp is to be used. The reason this is done is to inadvertently
   * overwrite cells when writes come in very fast. But for metric cells, the
   * cell timestamp signifies the metric timestamp. Hence we don't want to
   * overwrite it.
   *
   * @param timestamp
   * @param tags
   * @return cell timestamp
   */
  private long getCellTimestamp(long timestamp, List<Tag> tags) {
    // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
    // then use the generator
    if (timestamp == HConstants.LATEST_TIMESTAMP) {
      return timestampGenerator.getUniqueTimestamp();
    } else {
      return timestamp;
    }
  }

  /*
   * (non-Javadoc)
   *
   * Creates a {@link FlowScanner} Scan so that it can correctly process the
   * contents of {@link FlowRunTable}.
   *
   * @see
   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
   * .hadoop.hbase.coprocessor.ObserverContext,
   * org.apache.hadoop.hbase.client.Get, java.util.List)
   */
  @Override
  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
      Get get, List<Cell> results) throws IOException {
    Scan scan = new Scan(get);
    scan.setMaxVersions();
    RegionScanner scanner = null;
    try {
      scanner = new FlowScanner(e.getEnvironment(), scan,
          region.getScanner(scan), FlowScannerOperation.READ);
      scanner.next(results);
      e.bypass();
    } finally {
      if (scanner != null) {
        scanner.close();
      }
    }
  }

  /*
   * (non-Javadoc)
   *
   * Ensures that max versions are set for the Scan so that metrics can be
   * correctly aggregated and min/max can be correctly determined.
   *
   * @see
   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
   * .apache.hadoop.hbase.coprocessor.ObserverContext,
   * org.apache.hadoop.hbase.client.Scan,
   * org.apache.hadoop.hbase.regionserver.RegionScanner)
   */
  @Override
  public RegionScanner preScannerOpen(
      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
      RegionScanner scanner) throws IOException {
    // set max versions for scan to see all
    // versions to aggregate for metrics
    scan.setMaxVersions();
    return scanner;
  }

  /*
   * (non-Javadoc)
   *
   * Creates a {@link FlowScanner} Scan so that it can correctly process the
   * contents of {@link FlowRunTable}.
   *
   * @see
   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
   * org.apache.hadoop.hbase.coprocessor.ObserverContext,
   * org.apache.hadoop.hbase.client.Scan,
   * org.apache.hadoop.hbase.regionserver.RegionScanner)
   */
  @Override
  public RegionScanner postScannerOpen(
      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
      RegionScanner scanner) throws IOException {
    return new FlowScanner(e.getEnvironment(), scan,
        scanner, FlowScannerOperation.READ);
  }

  @Override
  public InternalScanner preFlush(
      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
      InternalScanner scanner) throws IOException {
    if (LOG.isDebugEnabled()) {
      if (store != null) {
        LOG.debug("preFlush store = " + store.getColumnFamilyName()
            + " flushableSize=" + store.getFlushableSize()
            + " flushedCellsCount=" + store.getFlushedCellsCount()
            + " compactedCellsCount=" + store.getCompactedCellsCount()
            + " majorCompactedCellsCount="
            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
            + store.getMemstoreFlushSize() + " memstoreSize="
            + store.getMemStoreSize() + " size=" + store.getSize()
            + " storeFilesCount=" + store.getStorefilesCount());
      }
    }
    return new FlowScanner(c.getEnvironment(), scanner,
        FlowScannerOperation.FLUSH);
  }

  @Override
  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
      Store store, StoreFile resultFile) {
    if (LOG.isDebugEnabled()) {
      if (store != null) {
        LOG.debug("postFlush store = " + store.getColumnFamilyName()
            + " flushableSize=" + store.getFlushableSize()
            + " flushedCellsCount=" + store.getFlushedCellsCount()
            + " compactedCellsCount=" + store.getCompactedCellsCount()
            + " majorCompactedCellsCount="
            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
            + store.getMemstoreFlushSize() + " memstoreSize="
            + store.getMemStoreSize() + " size=" + store.getSize()
            + " storeFilesCount=" + store.getStorefilesCount());
      }
    }
  }

  @Override
  public InternalScanner preCompact(
      ObserverContext<RegionCoprocessorEnvironment> e, Store store,
      InternalScanner scanner, ScanType scanType, CompactionRequest request)
      throws IOException {

    FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
    if (request != null) {
      requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
          : FlowScannerOperation.MINOR_COMPACTION);
      LOG.info("Compactionrequest= " + request.toString() + " "
          + requestOp.toString() + " RegionName=" + e.getEnvironment()
              .getRegion().getRegionInfo().getRegionNameAsString());
    }
    return new FlowScanner(e.getEnvironment(), scanner, requestOp);
  }
}
